{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Logistic Regression Code Along\n",
    "This is a code along of the famous titanic dataset, its always nice to start off with this dataset because it is an example you will find across pretty much every data analysis language."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.appName('myproj').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data = spark.read.csv('titanic.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- PassengerId: integer (nullable = true)\n",
      " |-- Survived: integer (nullable = true)\n",
      " |-- Pclass: integer (nullable = true)\n",
      " |-- Name: string (nullable = true)\n",
      " |-- Sex: string (nullable = true)\n",
      " |-- Age: double (nullable = true)\n",
      " |-- SibSp: integer (nullable = true)\n",
      " |-- Parch: integer (nullable = true)\n",
      " |-- Ticket: string (nullable = true)\n",
      " |-- Fare: double (nullable = true)\n",
      " |-- Cabin: string (nullable = true)\n",
      " |-- Embarked: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "data.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['PassengerId',\n",
       " 'Survived',\n",
       " 'Pclass',\n",
       " 'Name',\n",
       " 'Sex',\n",
       " 'Age',\n",
       " 'SibSp',\n",
       " 'Parch',\n",
       " 'Ticket',\n",
       " 'Fare',\n",
       " 'Cabin',\n",
       " 'Embarked']"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "my_cols = data.select(['Survived',\n",
    " 'Pclass',\n",
    " 'Sex',\n",
    " 'Age',\n",
    " 'SibSp',\n",
    " 'Parch',\n",
    " 'Fare',\n",
    " 'Embarked'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "my_final_data = my_cols.na.drop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Working with Categorical Columns\n",
    "\n",
    "Let's break this down into multiple steps to make it all clear."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import (VectorAssembler,VectorIndexer,\n",
    "                                OneHotEncoder,StringIndexer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')\n",
    "gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')\n",
    "embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "assembler = VectorAssembler(inputCols=['Pclass',\n",
    " 'SexVec',\n",
    " 'Age',\n",
    " 'SibSp',\n",
    " 'Parch',\n",
    " 'Fare',\n",
    " 'EmbarkVec'],outputCol='features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pipelines \n",
    "\n",
    "Let's see an example of how to use pipelines (we'll get a lot more practice with these later!)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pipeline = Pipeline(stages=[gender_indexer,embark_indexer,\n",
    "                           gender_encoder,embark_encoder,\n",
    "                           assembler,log_reg_titanic])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "fit_model = pipeline.fit(train_titanic_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "results = fit_model.transform(test_titanic_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',\n",
    "                                       labelCol='Survived')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+----------+\n",
      "|Survived|prediction|\n",
      "+--------+----------+\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "|       0|       0.0|\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       0.0|\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "|       0|       1.0|\n",
      "+--------+----------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "results.select('Survived','prediction').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "AUC = my_eval.evaluate(results)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.7918269230769232"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "AUC"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Great Job!"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
